package com.cloudansys.core.flink.function;

import com.cloudansys.config.DefaultConfig;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.List;

@Slf4j
public class YBProcessor extends ProcessFunction<List<MultiDataEntity>, List<MultiDataEntity>> {

    private Jedis jedis;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        jedis = DefaultConfig.getJedis();
    }

    /**
     * 把应变类型的应力数值（第2个指标）减去所对应的当前应变参考点的数值，
     * 然后把第6个指标设置成换算后的数值
     */
    @Override
    public void processElement(List<MultiDataEntity> elements, Context ctx, Collector<List<MultiDataEntity>> out) throws Exception {
        List<MultiDataEntity> resList = new ArrayList<>();
        for (MultiDataEntity element : elements) {
            String serialCode = element.getSerialCode();
            Double[] values = element.getValues();
            if (Const.SCS_YB_CK.get(Const.SCS_31).contains(serialCode)) {
                // 从 redis 中获取顶固的参考点（31）的当前数值
                String topValue = jedis.get(Const.SCS_31);
                Double processedValue = values[1] - Double.parseDouble(topValue);
                values[5] = Double.parseDouble(String.format(Const.FORMAT_DOUBLE, processedValue));
                element.setValues(values);
            }
            if (Const.SCS_YB_CK.get(Const.SCS_32).contains(serialCode)) {
                // 从 redis 中获取地锚的参考点（32）的当前数值
                String bottomValue = jedis.get(Const.SCS_32);
                Double processedValue = values[1] - Double.parseDouble(bottomValue);
                values[5] = Double.parseDouble(String.format(Const.FORMAT_DOUBLE, processedValue));
                element.setValues(values);
            }
            resList.add(element);
        }
        out.collect(resList);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (jedis != null) {
            jedis.close();
        }
    }

}
